1 module jupyter.wire.kernel; 2 3 import jupyter.wire.message: Message; 4 5 enum protocolVersion = "5.3.0"; 6 7 /** 8 So users don't have to write their own main 9 */ 10 mixin template Main(Backend) { 11 int main(string[] args) { 12 import jupyter.wire.log: log; 13 try { 14 run!Backend(args); 15 return 0; 16 } catch(Exception e) { 17 log("Error: ", e.toString); 18 return 1; 19 } catch(Error e) { 20 log("FATAL ERROR: ", e.toString); 21 return 2; 22 } 23 } 24 } 25 26 void run(Backend)(in string[] args) { 27 import jupyter.wire.kernel: Kernel; 28 import std.exception: enforce; 29 30 const exeName = args.length > 0 ? args[0] : "<exeName>"; 31 enforce(args.length == 2, "Usage: " ~ exeName ~ " <connectionFileName>"); 32 33 const connectionFileName = args[1]; 34 auto backend = Backend(); 35 auto k = kernel(backend, connectionFileName); 36 k.run; 37 } 38 39 40 struct LanguageInfo { 41 string name; 42 string version_; 43 string fileExtension; 44 string mimeType; 45 } 46 47 48 struct ExecutionResult { 49 string result; 50 string stdout; 51 string mime = "text/plain"; 52 } 53 54 55 struct Stdout { 56 string value; 57 } 58 59 alias IoPubMessageSender = void delegate(Message) @safe; 60 61 ExecutionResult textResult(string result, Stdout stdout = Stdout("")) @safe pure nothrow { 62 return ExecutionResult(result, stdout.value, "text/plain"); 63 } 64 65 ExecutionResult markdownResult(string result, Stdout stdout = Stdout("")) @safe pure nothrow { 66 return ExecutionResult(result, stdout.value, "text/markdown"); 67 } 68 69 70 template isBackend(T) { 71 enum isBackend = is(typeof({ 72 LanguageInfo info = T.init.languageInfo; 73 scope IoPubMessageSender sender = (Message){}; 74 ExecutionResult result = T.init.execute("foo", sender); 75 })); 76 } 77 78 79 auto kernel(Backend, Args...)(Backend backend, auto ref Args args) { 80 return Kernel!Backend(backend, args); 81 } 82 83 84 /** 85 Implements a generic Jupyter kernel. 86 Parameterised by a `Backend` type that knows how to execute code. 87 88 */ 89 struct Kernel(Backend) if(isBackend!Backend) { 90 91 import jupyter.wire.connection: ConnectionInfo, Sockets; 92 import zmqd: Socket; 93 import std.typecons: Nullable; 94 95 private Backend backend; 96 private Sockets sockets; 97 private int executionCount = 1; 98 private bool stop; 99 100 this(Backend backend, in string connectionFileName) { 101 import jupyter.wire.connection: fileNameToConnectionInfo; 102 this(backend, fileNameToConnectionInfo(connectionFileName)); 103 } 104 105 this(Backend backend, ConnectionInfo connectionInfo) { 106 import std.traits : hasMember; 107 import jupyter.wire.log: log; 108 109 log("Jupyter kernel starting with connection info ", connectionInfo); 110 111 this.backend = backend; 112 this.sockets = Sockets(connectionInfo); 113 static if (hasMember!(Backend, "initialize")) 114 this.backend.initialize(); 115 } 116 117 void run() { 118 import jupyter.wire.connection: recvRequestMessage; 119 import std.datetime: msecs; 120 import core.thread: Thread; 121 122 for(;!stop;) { 123 maybeHandleRequestMessage(sockets.shell.recvRequestMessage); 124 maybeHandleRequestMessage(sockets.control.recvRequestMessage); 125 () @trusted { Thread.sleep(10.msecs); }(); 126 } 127 } 128 129 void maybeHandleRequestMessage(Nullable!Message requestMessage) { 130 if(requestMessage.isNull) return; 131 132 version(JupyterLogVerbose) { 133 import jupyter.wire.log: log; 134 log("Received message from the front-end."); 135 } 136 137 handleRequestMessage(requestMessage.get); 138 } 139 140 void handleRequestMessage(Message requestMessage) { 141 142 import jupyter.wire.message: statusMessage, pubMessage; 143 import jupyter.wire.log: log; 144 import std.json : JSONValue, parseJSON; 145 146 version(JupyterLogVerbose) log("Sending busy message to the FE"); 147 auto busyMsg = statusMessage(requestMessage.header, "busy"); 148 sockets.send(sockets.ioPub, busyMsg); 149 150 scope(exit) { 151 version(JupyterLogVerbose) log("Sending idle message to the FE"); 152 auto idleMsg = statusMessage(requestMessage.header, "idle"); 153 sockets.send(sockets.ioPub, idleMsg); 154 } 155 156 switch(requestMessage.header.msgType) { 157 158 default: return; 159 160 case "complete_request": 161 handleCompleteRequest(requestMessage); 162 return; 163 164 case "shutdown_request": 165 version(JupyterLogVerbose) log("Told by the FE to shutdown"); 166 handleShutdown(requestMessage); 167 return; 168 169 case "kernel_info_request": 170 version(JupyterLogVerbose) log("Asked by the FE to return kernel info"); 171 handleKernelInfoRequest(requestMessage); 172 return; 173 174 case "execute_request": 175 version(JupyterLogVerbose) log("Told by the FE to execute code"); 176 handleExecuteRequest(requestMessage); 177 return; 178 179 case "comm_open": 180 version(JupyterLogVerbose) log("Told by the FE to open a comm"); 181 handleCommOpen(requestMessage); 182 return; 183 184 case "comm_msg": 185 version(JupyterLogVerbose) log("Received a comm msg from the FE"); 186 handleCommMessage(requestMessage); 187 return; 188 189 case "comm_close": 190 version(JupyterLogVerbose) log("Told by the FE to close a comm"); 191 handleCommClose(requestMessage); 192 return; 193 } 194 195 assert(0); 196 } 197 198 void handleCommOpen(Message requestMessage) { 199 import std.traits: hasMember; 200 import jupyter.wire.message: commCloseMessage; 201 202 void closeComm() { 203 sockets.send(sockets.ioPub, commCloseMessage(requestMessage)); 204 } 205 206 static if (!hasMember!(Backend, "commOpen")) { 207 closeComm(); 208 } else { 209 try { 210 scope sender = (Message msg){ 211 msg.parentHeader = requestMessage.header; 212 sockets.send(sockets.ioPub, msg); 213 }; 214 215 if (!backend.commOpen(requestMessage.content["comm_id"].str, 216 requestMessage.content["target_name"].str, 217 requestMessage.metadata, 218 requestMessage.content["data"], 219 sender)) 220 closeComm(); 221 } catch (Exception e) { 222 closeComm(); 223 throw e; 224 } 225 } 226 } 227 228 void handleCommMessage(Message requestMessage) { 229 import std.traits: hasMember; 230 231 static if (hasMember!(Backend, "commMessage")) { 232 scope sender = (Message msg){ 233 msg.parentHeader = requestMessage.header; 234 sockets.send(sockets.ioPub, msg); 235 }; 236 backend.commMessage(requestMessage.content["comm_id"].str, 237 requestMessage.content["data"], 238 sender); 239 } 240 } 241 242 void handleCommClose(Message requestMessage) { 243 import std.traits: hasMember; 244 245 static if (hasMember!(Backend, "commClose")) { 246 scope sender = (Message msg){ 247 msg.parentHeader = requestMessage.header; 248 sockets.send(sockets.ioPub, msg); 249 }; 250 backend.commClose(requestMessage.content["comm_id"].str, 251 requestMessage.content["data"], 252 sender); 253 254 } 255 } 256 257 void handleShutdown(Message requestMessage) { 258 // TODO: restart 259 // The content of the request is just {"restart": bool} so we reuse it 260 // for the reply. 261 auto replyMessage = Message(requestMessage, "shutdown_reply", requestMessage.content); 262 sockets.send(sockets.control, replyMessage); 263 stop = true; 264 } 265 266 void handleKernelInfoRequest(Message requestMessage) { 267 import std.json: JSONValue; 268 import std.traits : hasMember; 269 270 JSONValue[string] languageInfo; 271 languageInfo["name"] = backend.languageInfo.name; 272 languageInfo["version"] = backend.languageInfo.version_; 273 languageInfo["file_extension"] = backend.languageInfo.fileExtension; 274 static if (hasMember!(LanguageInfo,"mimeType")) 275 languageInfo["mimetype"] = backend.languageInfo.mimeType; 276 277 JSONValue kernelInfo; 278 kernelInfo["status"] = "ok"; 279 kernelInfo["protocol_version"] = protocolVersion; 280 kernelInfo["implementation"] = "foo"; 281 kernelInfo["implementation_version"] = "0.0.1"; 282 kernelInfo["language_info"] = languageInfo; 283 static if (hasMember!(Backend, "banner")) 284 kernelInfo["banner"] = backend.banner; 285 286 auto replyMessage = Message(requestMessage, "kernel_info_reply", kernelInfo); 287 sockets.send(sockets.shell, replyMessage); 288 } 289 290 void handleCompleteRequest(Message requestMessage) { 291 import jupyter.wire.message: completeMessage; 292 import std.traits: hasMember; 293 294 static if (hasMember!(Backend, "complete")) { 295 const result = backend.complete(requestMessage.content["code"].str, 296 requestMessage.content["cursor_pos"].integer); 297 auto msg = completeMessage(requestMessage, result); 298 sockets.send(sockets.shell, msg); 299 } 300 } 301 302 void handleExecuteRequest(Message requestMessage) { 303 import jupyter.wire.message: pubMessage; 304 import std.json: JSONValue, parseJSON, JSONType; 305 import std.conv: text; 306 307 scope(exit) { 308 if(requestMessage.content["store_history"].type == JSONType.true_) 309 ++executionCount; 310 } 311 312 { 313 JSONValue content; 314 content["execution_count"] = executionCount; 315 content["code"] = requestMessage.content["code"]; 316 auto msg = pubMessage(requestMessage.header, "execute_input", content); 317 sockets.send(sockets.ioPub, msg); 318 } 319 320 try { 321 scope sender = (Message msg){ 322 msg.parentHeader = requestMessage.header; 323 sockets.send(sockets.ioPub, msg); 324 }; 325 const result = backend.execute(requestMessage.content["code"].str, sender); 326 sockets.stdout(requestMessage.header, result.stdout); 327 328 { 329 JSONValue content; 330 content["execution_count"] = executionCount; 331 content["data"] = JSONValue(); 332 content["data"][result.mime] = result.result; 333 content["metadata"] = parseJSON(`{}`); 334 sockets.publish(requestMessage.header, "execute_result", content); 335 } 336 337 { 338 JSONValue content; 339 content["status"] = "ok"; 340 content["execution_count"] = executionCount; 341 content["user_variables"] = parseJSON(`{}`); 342 content["user_expressions"] = parseJSON(`{}`); 343 content["payload"] = parseJSON(`[]`); 344 auto replyMessage = Message(requestMessage, "execute_reply", content); 345 sockets.send(sockets.shell, replyMessage); 346 } 347 348 } catch(Exception e) { 349 350 sockets.stdout(requestMessage.header, text("Error: ", e.msg)); 351 352 { 353 JSONValue content; 354 content["status"] = "error"; 355 content["execution_count"] = executionCount; 356 content["ename"] = typeid(e).name; 357 content["evalue"] = e.msg; 358 content["traceback"] = text(e); 359 360 auto replyMessage = Message(requestMessage, "execute_reply", content); 361 sockets.send(sockets.shell, replyMessage); 362 } 363 } 364 } 365 }